﻿using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using System.Linq;
using QW.Core.Log;

namespace QW.Core.Events.Default
{
    /// <summary>
    /// 异步执行事件总线
    /// </summary>
    public class DefaultEventBus : IEventBus
    {
        private readonly Hashtable Handlers = Hashtable.Synchronized(new Hashtable());
        private readonly Dictionary<string, Type> EventMap = new Dictionary<string, Type>();

        /// <summary>
        /// 待存储队列
        /// </summary>

        private readonly ConcurrentQueue<EventData> SaveQueue = new ConcurrentQueue<EventData>();
        /// <summary>
        /// 队列集合
        /// </summary>
        private readonly ConcurrentQueue<EventData> EventQueue = new ConcurrentQueue<EventData>();

        /// <summary>
        /// 待移除队列
        /// </summary>
        private readonly ConcurrentQueue<EventData> RemoveQueue = new ConcurrentQueue<EventData>();

        private DefaultTimer timerDequeue;
        private DefaultTimer timerEnqueue;
        private DefaultTimer timerSave;
        private EventComparer comparer = new EventComparer();
        private IEventStore EventStore;
        /// <summary>
        /// .ctor
        /// </summary>
        /// <param name="eventStore"></param>
        public DefaultEventBus(IEventStore eventStore)
        {
            //事件仓储
            this.EventStore = eventStore;

            if (this.EventStore != null)
            {
                //持久化计时器,每5秒存储一次
                timerSave = new DefaultTimer(Save, 5000);

                //出队计时器,每秒出队一次
                timerDequeue = new DefaultTimer(Dequeue, 1000);

                //入队计时器,每5秒入队一次
                timerEnqueue = new DefaultTimer(Enqueue, 5000);
            }
        }
        /// <summary>
        /// 发布事件异步
        /// </summary>
        /// <typeparam name="TEvent"></typeparam>
        /// <param name="e"></param>
        /// <returns></returns>
        public async Task PublishAsync<TEvent>(TEvent e)
           where TEvent : class, IEvent
        {
            var eventType = typeof(TEvent);
            var eventName = eventType.FullName;

            if (!Handlers.ContainsKey(eventName))
                return;//当前事件无相关订阅者

            var detail = Handlers[eventName] as EventDetail;
            if (detail.IsDelay)
            {
                var data = EventConvert.ToEventData(e);
                SaveQueue.Enqueue(data);
            }
            else
            {
                await PublishAsync(e, detail);
            }
        }
        //TODO:DZY[20200608]异步有时候报外部错误，具体问题暂未查到，粗略猜测是await有问题
        private async Task PublishAsync<TEvent>(TEvent e, EventDetail detail)
             where TEvent : class, IEvent
        {
            await Task.Factory.StartNew(() =>
            {
                foreach (var handler in detail.Handlers)
                {
                    try
                    {
                        handler.Handler(null, new[] { e });
                    }
                    catch (Exceptions.QWBaseBusinessException ex)
                    {
                        //忽略业务异常
                    }
                    catch (Exception ex)
                    {
                        LogHandler.Error("事件处理异常:" + detail.EventName + "\r\n" + JsonConvert.SerializeObject(e), ex);
                    }
                }
            });
        }

        /// <summary>
        /// 发布事件同步
        /// </summary>
        /// <typeparam name="TEvent"></typeparam>
        /// <param name="e"></param>
        /// <returns></returns>
        public void Publish<TEvent>(TEvent e)
           where TEvent : class, IEvent
        {
            var eventType = typeof(TEvent);
            var eventName = eventType.FullName;

            if (!Handlers.ContainsKey(eventName))
                return;//当前事件无相关订阅者

            var detail = Handlers[eventName] as EventDetail;
            if (detail.IsDelay)
            {
                var data = EventConvert.ToEventData(e);
                SaveQueue.Enqueue(data);
            }
            else
            {
                Publish(e, detail);
            }
        }

        private void Publish<TEvent>(TEvent e, EventDetail detail)
             where TEvent : class, IEvent
        {
            foreach (var handler in detail.Handlers)
            {
                try
                {
                    handler.Handler(null, new[] { e });
                }
                catch (Exceptions.QWBaseBusinessException ex)
                {
                    //忽略业务异常
                }
                catch (Exception ex)
                {
                    LogHandler.Error("事件处理异常:" + detail.EventName + "\r\n" + JsonConvert.SerializeObject(e), ex);
                }
            }
        }
        /// <summary>
        /// 订阅
        /// </summary>
        /// <param name="detail"></param>
        public void Subscribe(EventDetail detail)
        {
            var eventName = detail.EventName;
            if (!Handlers.ContainsKey(eventName))
            {
                EventMap.Add(eventName, detail.TypeInfo);
                Handlers.Add(eventName, detail);
            }
        }

        /// <summary>
        /// 获取注册中的事件处理程序集
        /// </summary>
        /// <param name="name">事件名称,默认空 表示取所有</param>
        /// <returns></returns>
        public Dictionary<string, string> GetEventHandlers(string name = "")
        {
            Dictionary<string, string> result = new Dictionary<string, string>();
            foreach (var item in EventMap)
            {
                bool can_add = true;
                if (!string.IsNullOrWhiteSpace(name) && item.Key != name)
                {
                    can_add = false;
                }
                if (can_add)
                {
                    result.Add(item.Key, item.Value.FullName);
                }
            }
            return result;
        }
        /// <summary>
        /// 保存
        /// </summary>
        private void Save()
        {
            var save = new List<EventData>();
            EventData item;
            while (SaveQueue.TryDequeue(out item))
            {
                save.Add(item);
                if (save.Count >= 1000)
                {
                    EventStore.Save(save);
                    save.Clear();
                }
            }
            if (save.Count > 0)
                EventStore.Save(save);
        }

        private void Enqueue()
        {
            var remove = new List<Guid>();
            EventData item1;
            while (RemoveQueue.TryDequeue(out item1))
                remove.Add(item1.EventId);
            EventStore.Remove(remove);

            var end = DateTime.Now.AddSeconds(10);
            var events = EventStore.Pull(end, 10000);
            foreach (var item in events)
            {
                if (!(EventQueue.Contains(item, comparer) || RemoveQueue.Contains(item, comparer)))
                    EventQueue.Enqueue(item);//入队
            }
        }

        private async void Dequeue()
        {
            EventData eventData = null;
            while (true)
            {
                if (EventQueue.IsEmpty)
                    return;//队列为空

                if (!EventQueue.TryPeek(out eventData))
                    return;//尝试取出队首为空

                if (eventData.TriggerTime > DateTime.Now)
                    return;//未到事件事件

                if (!EventQueue.TryDequeue(out eventData))
                    return;//队列数据取出失败

                RemoveQueue.Enqueue(eventData);

                var eventName = eventData.EventName;
                var eventType = EventMap[eventName];
                var eventItem = EventConvert.ToEvent(eventData, eventType);
                if (!Handlers.ContainsKey(eventName))
                    continue;//当前事件无相关订阅者

                var detail = Handlers[eventName] as EventDetail;
                await PublishAsync(eventItem as IEvent, detail);

            }
        }
        /// <summary>
        /// 
        /// </summary>
        public void Dispose()
        {
            Handlers.Clear();
        }
    }
}
